package com.atguigu.watermark.generate;

import com.atguigu.bean.WaterSensor;
import com.atguigu.function.WaterSensorMapFunction;
import com.atguigu.watermark.generate.MyPeriodWatermarkGenerator;
import com.atguigu.watermark.generate.MyPuntuatedWatermarkGenerator;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * 用户自定义水位线生成器
 */
public class WatermarkCustomDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // TODO 自定义水位线定时生成周期
        // 默认周期 200ms
        env.getConfig().setAutoWatermarkInterval(2000);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("10.75.186.206", 9999)
                .map(new WaterSensorMapFunction());

        // TODO 1.定义Watermark策略
        WatermarkStrategy<WaterSensor> watermarkStrategy = WatermarkStrategy
                // 2.自定义的 周期性生成
                .forGenerator(st -> new MyPeriodWatermarkGenerator<WaterSensor>(3000L))
                // 2.自定义的 断点式生成
//                .<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
//                    @Override
//                    public WatermarkGenerator createWatermarkGenerator(Context context) {
//                        return new MyPuntuatedWatermarkGenerator<WaterSensor>(3000L);
//                    }
//                })
                // 1.2 指定 时间戳分配器，从数据中提取
                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                    @Override
                    public long extractTimestamp(WaterSensor waterSensor, long l) {
                        // 返回的时间戳，要 毫秒
                        System.out.println("数据=" + waterSensor + ",recordTs=" + l);
                        return waterSensor.getTs() * 1000L;
                    }
                });

        // TODO 2. 指定 watermark策略
        sensorDS.assignTimestampsAndWatermarks(watermarkStrategy)
                .keyBy(WaterSensor::getId)
                // TODO 3.使用 事件时间语义 的窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(
                        new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {

                            @Override
                            public void process(String s, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                                long startTs = context.window().getStart();
                                long endTs = context.window().getEnd();
                                String windowStart = DateFormatUtils.format(startTs, "yyyy-MM-dd HH:mm:ss.SSS");
                                String windowEnd = DateFormatUtils.format(endTs, "yyyy-MM-dd HH:mm:ss.SSS");

                                long count = elements.spliterator().estimateSize();

                                out.collect("key=" + s + "的窗口[" + windowStart + "," + windowEnd + ")包含" + count + "条数据===>" + elements.toString());
                            }
                        }
                )
                .print();


        env.execute();
    }

}
